-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Feature][Kafka source] Inject Kafka record timestamp as EventTime metadata #9994
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Conversation
|
This PR can run successfully in Flink, but the E2E run in Spark fails. The reason is that the Spark engine path loses options during the conversion between SeaTunnelRow and Spark Row, causing the Metadata Transform to fail to retrieve EventTime. As a result, the output event_time_ms is empty, leading to an assertion failure. I need to confirm whether to fix this issue in this PR or start a separate PR specifically to address the problem of metadata loss in the Spark translation layer. |
…w doesn't already carry an EventTime
…ev-kafka-EventTime-1028
| // Propagate Kafka record timestamp as metadata EventTime so downstream can materialize it | ||
| outputCollector.setCurrentRecordTimestamp(consumerRecord.timestamp()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not do it in DeserializationSchema ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the reminder. I'll check it later.
Purpose of this pull request
Motivation
Problem: In Kafka->Hive streaming, using CURRENT_DATE()/CURRENT_TIMESTAMP() misplaces records when replaying; parsing create_date is brittle due to dirty/mixed formats.
Goal: Reuse SeaTunnel’s metadata mechanism to inject Kafka ConsumerRecord.timestamp as EventTime, then let users materialize it via the Metadata transform for SQL/partitioning.
Design
In KafkaRecordEmitter: capture ConsumerRecord.timestamp per record; in OutputCollector.collect, if record is SeaTunnelRow and timestamp>=0, call MetadataUtil.setEventTime(row, ts).
No schema change, no mandatory new options; injection is on by default. Users opt-in to materialize via the Metadata transform (e.g., mapping EventTime to kafka_ts).
Does this PR introduce any user-facing change?
user can use
to partitioning and transforms
How was this patch tested?
yes,UT and E2E
Check list
New License Guide